package com.disney.rocketmq.sdk.base;

import com.disney.domain.sdk.anno.MQProducer;
import com.disney.domain.sdk.mq.DomainMessage;
import com.disney.domain.sdk.mq.MessageHeader;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.messaging.Message;
import org.springframework.messaging.core.MessagePostProcessor;
import org.springframework.messaging.support.MessageBuilder;

import javax.annotation.Resource;
import java.util.Optional;

/**
 * @author issavior
 */
@Slf4j
public class BaseProducer implements InitializingBean {

    @Resource
    private RocketMQTemplate rocketMqTemplate;

    private String topic;
    private String tag;

    private String destination;


    @Override
    public void afterPropertiesSet() {
        MQProducer anno = this.getClass().getAnnotation(MQProducer.class);
        this.topic = anno.topic().getValue();
        this.tag = anno.tag().getValue();
        this.destination = this.topic + this.tag;
        log.debug("{}初始化:topic={},tag={}", this.getClass().getName(), topic, tag);
    }

    public void sent(DomainMessage<?> domainMessage) {
        Message<?> message = MessageBuilder.withPayload(domainMessage.getMessage()).setHeader(MessageHeader.KEY, domainMessage.getKey()).build();
        rocketMqTemplate.convertAndSend(topic + tag, message);
    }

    public void sent(DomainMessage<?> domainMessage, MessagePostProcessor messagePostProcessor) {
        Message<?> message = MessageBuilder.withPayload(domainMessage.getMessage()).setHeader(MessageHeader.KEY, domainMessage.getKey()).build();
        rocketMqTemplate.convertAndSend(destination, message, messagePostProcessor);
    }

    public void syncSend(DomainMessage<?> domainMessage) {
        Message<?> message = MessageBuilder.withPayload(domainMessage.getMessage()).setHeader(MessageHeader.KEY, domainMessage.getKey()).build();
        SendResult result = Optional.ofNullable(domainMessage.getDelayLevel())
                .map(l -> rocketMqTemplate.syncSend(destination, message, domainMessage.getTimeout(), l))
                .orElseGet(() -> rocketMqTemplate.syncSend(destination, message, domainMessage.getTimeout()));
        Optional.ofNullable(result)
                .filter(r -> r.getSendStatus() == SendStatus.SEND_OK).ifPresent(r -> log.debug("MQ发送成功,result = {}", r));
        Optional.ofNullable(result)
                .filter(r -> r.getSendStatus() == SendStatus.SEND_OK).ifPresent(r -> log.debug("MQ发送成功,result = {}", r));
    }

    public void asyncSend(DomainMessage<?> domainMessage, SendCallback sendCallback) {
        Message<?> message = MessageBuilder.withPayload(domainMessage.getMessage()).setHeader(MessageHeader.KEY, domainMessage.getKey()).build();
        rocketMqTemplate.asyncSend(destination, message, sendCallback, domainMessage.getTimeout());

    }


}
